Reinforcement Learning with Policy Gradients using PyTorch


1. What reinforcement learning is all about?


rl loop

An RL algorithm seeks to maximize the agent’s expected return (total future reward), given a previously unknown environment, through a trial-and-error learning process.

Our solution will be a policy. policy

Deep Reinforcement Learning

Solve RL problems through deep learning.


All of them seek to maximize expected return but in different ways.

Policy Gradients

Expected return:

$$ J\left( \mathbf{\theta}\right) =\mathbf{E}\left[ \sum\nolimits_{t=0}^{T}R_{t}\right] \\ R_t - \text{random variable representing reward reached at time } t \\ \text{ following policy } \pi \text{ from some initial state} \\ T - \text{final time step or end of the episode} $$

All we do is finding gradient estimate of expected return to do stochastic gradient ascend update!


If the gradient estimate is unbiased and learning rates fulfill $\sum\textstyle_{h=0}^{\infty}\alpha_{h}>0$ and $\sum\textstyle_{h=0}^{\infty}\alpha_{h}^{2}=\textrm{const}\ ,$ the learning process is guaranteed to converge at least to a local minimum.

2. PG Pong

Environment (OpenAI Gym)

In [ ]:
import gym

env = gym.make('Pong-v0').unwrapped
observation = env.reset()

while True:
    observation, reward, done, _ = env.step(action)
    # Record reward for future training
    reward_sum += reward

Input (preprocessing)

In [ ]:
def preprocess(img):
    """ Preprocess 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
    I = img[35:195]     # crop
    I = I[::2, ::2, 0]  # downsample by factor of 2
    I[I == 144] = 0     # erase background (background type 1)
    I[I == 109] = 0     # erase background (background type 2)
    I[I != 0] = 1       # everything else (paddles, ball) just set to 1

    return I.astype(np.float).ravel()


In [ ]:
class PolicyGradient(nn.Module):
    It's out model class.

    def __init__(self, in_dim):
        super(PolicyGradient, self).__init__()
        self.hidden = nn.Linear(in_dim, 200)
        self.out = nn.Linear(200, 3)

        self.rewards = []
        self.actions = []

        # Weights initialization
        for m in self.modules():
            if isinstance(m, nn.Linear):
                # 'n' is number of inputs to each neuron
                n = len([1])
                # "Xavier" initialization
      , np.sqrt(2. / n))

    def forward(self, x):
        h = F.relu(self.hidden(x))
        logits = self.out(h)
        return F.softmax(logits)

    def reset(self):
        del self.rewards[:]
        del self.actions[:]

Stochastic policy

We use stochastic policy which means our model produces probability distribution over all actions, π(a | s) = probability of action given state. Then we sample from this distribution in order to get action.

Why stochastic policy?:

  • We can use the score function gradient estimator, which tries to make good actions more probable.
  • Stochastic environments.
  • Partially observable states.
  • The randomness inherent in the policy leads to exploration, which is crucial for most learning problems.

In [ ]:
def get_action(policy, observation):
    # Get current state, which is difference between current and previous state
    cur_state = preprocess(observation)
    state = cur_state - get_action.prev_state \
        if get_action.prev_state is not None else np.zeros(len(cur_state))
    get_action.prev_state = cur_state

    var_state = Variable(
        # Make torch FloatTensor from numpy array and add batch dimension
    probabilities = policy(var_state)
    # Stochastic policy: roll a biased dice to get an action
    action = probabilities.multinomial()
    # Record action for future training
    # '+ 1' converts action to valid Pong env action
    return[0, 0] + 1


Supervised Learning

Maximize log likelihood of true label (e.g. cross-entropy error).

Loss: $ \sum_i log p(\text{a } \vert \text{ img}) $

Reinforcement Learning

Maximize log likelihood of good action and minimize it for bad actions (via advantage or on diagram "eventual reward").

Policy Gradients: Run a policy for a while. See what actions led to high rewards. Increase their probability.

Loss: $ \sum_i A_i log p(\text{a } \vert \text{ img}) $

In [ ]:
# Let's play the game ;)
while True:
    ### Here actions are taken in environment ###
    action = get_action(policy, observation)
    observation, reward, done, _ = env.step(action)
    # Record reward for future training
    reward_sum += reward

    ### Here is our reinforcement learning logic ###
    if done:
        num_episodes += 1


        # Reinforce actions
        for action, reward in zip(policy.actions, rewards):

        # BACKPROP!!! (Gradients are accumulated each episode until update)
        autograd.backward(policy.actions, [None for a in policy.actions])

        ### Here we do weight update each batch ###
        if num_episodes % HPARAMS.batch_size == 0:
            print "### Updated parameters! ###"

3. Extras

Discounted reward

In a more general RL setting we would receive some reward $r_t$ at every time step. One common choice is to use a discounted reward, so the “eventual reward” in the diagram above would become:

$$ R_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k} \\ 0 \leq \gamma < 1 $$

But why discounted?

  • We care more about tomorrow than what will be sometime in the distant future.
  • In infinite horizont without discount we would get infinite rewards (infinite in this case means troubles).

Know your limit!...

$$ 0 \leq \gamma < 1 \\ R_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k} \leq \sum_{k=0}^{\infty} \gamma^k R_{max} = \frac{R_{max}}{1 - \gamma} $$

Infinite horizont has finite sum of discounted rewards...

Why is this important?

Maximum Expected Utility (MEU) principle says...

A rational agent should chose the action that maximizes its expected utility given its knowlage.

Expected utility in state s with respect to policy:

$$ U^{\pi}(s) = E[\sum_{t = 0}^{\infty}\gamma^tR(S_t)] \\ S_t - \text{random variable representing state reached at time } t \text{ following policy } \pi $$

...where the expectation is with respect to the probability distribution over state sequences determined by s and π.

Comparing infinities could be problematic...

In [ ]:
# Compute discounted reward
discounted_R = []
running_add = 0
for reward in policy.rewards[::-1]:
    if reward != 0:
        # Reset the sum, since this was a game boundary (pong specific!)
        running_add = 0

        running_add = running_add * HPARAMS.gamma + reward
        # "Further" actions have less discounted rewards
        discounted_R.insert(0, running_add)

    rewards = FloatTensor(discounted_R)
    # Standardize rewards
    rewards = (rewards - rewards.mean()) / \
        (rewards.std() + np.finfo(np.float32).eps)
    # Batch size shouldn't influence update step
    rewards = rewards / HPARAMS.batch_size

Deriving Policy Gradients

Score function gradient estimator

$$ f(x) - \text{scalar valued score function (our reward function).} \\ p(x|\theta) - \text{probability distribution parametrized by } \theta \text{ (our policy network).} $$

We are interested in finding how we should shift the distribution (through its parameters θ) to increase the scores of its samples, as judged by f (i.e. how do we change the network’s parameters so that action samples get higher rewards).

\begin{align} \nabla_{\theta} E_{x \sim p(x | \theta)}[f(x)] &= \nabla_{\theta} \sum_x p(x) f(x) & \text{definition of expectation} \\ & = \sum_x \nabla_{\theta} p(x) f(x) & \text{swap sum and gradient} \\ & = \sum_x p(x) \frac{\nabla_{\theta} p(x)}{p(x)} f(x) & \text{both multiply and divide by } p(x) \\ & = \sum_x p(x) \nabla_{\theta} \log p(x) f(x) & \text{use the fact that } \nabla_{\theta} \log(z) = \frac{1}{z} \nabla_{\theta} z \\ & = E_x[f(x) \nabla_{\theta} \log p(x) ] & \text{definition of expectation} \\ \end{align}$$ f(x) \nabla_{\theta} \log p(x) - \text{unbiased gradient estimator!} $$


Weights visualization


  • Hiperparameters tuning
  • ConvNets
  • Move penalty?

